| 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134 |
2x
2x
2x
2x
603x
603x
603x
603x
2x
633x
2x
630x
132x
132x
98x
132x
96x
84x
84x
84x
84x
84x
84x
84x
84x
84x
84x
84x
84x
84x
84x
132x
132x
132x
84x
84x
84x
2x
| /**
* Copyright 2017 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import * as api from '../protos/firestore_proto_api';
import { CredentialsProvider } from '../api/credentials';
import { maybeDocumentMap } from '../model/collections';
import { MaybeDocument } from '../model/document';
import { DocumentKey } from '../model/document_key';
import { Mutation, MutationResult } from '../model/mutation';
import { assert } from '../util/assert';
import { AsyncQueue } from '../util/async_queue';
import { Connection } from './connection';
import {
PersistentListenStream,
PersistentWriteStream
} from './persistent_stream';
import { JsonProtoSerializer } from './serializer';
// The generated proto interfaces for these class are missing the database
// field. So we add it here.
// TODO(b/36015800): Remove this once the api generator is fixed.
interface BatchGetDocumentsRequest extends api.BatchGetDocumentsRequest {
database?: string;
}
interface CommitRequest extends api.CommitRequest {
database?: string;
}
/**
* Datastore is a wrapper around the external Google Cloud Datastore grpc API,
* which provides an interface that is more convenient for the rest of the
* client SDK architecture to consume.
*/
export class Datastore {
constructor(
private queue: AsyncQueue,
private connection: Connection,
private credentials: CredentialsProvider,
private serializer: JsonProtoSerializer
) {}
newPersistentWriteStream(): PersistentWriteStream {
return new PersistentWriteStream(
this.queue,
this.connection,
this.credentials,
this.serializer
);
}
newPersistentWatchStream(): PersistentListenStream {
return new PersistentListenStream(
this.queue,
this.connection,
this.credentials,
this.serializer
);
}
commit(mutations: Mutation[]): Promise<MutationResult[]> {
const params: CommitRequest = {
database: this.serializer.encodedDatabaseId,
writes: mutations.map(m => this.serializer.toMutation(m))
};
return this.invokeRPC<CommitRequest, api.CommitResponse>(
'Commit',
params
).then(response => {
return this.serializer.fromWriteResults(response.writeResults);
});
}
lookup(keys: DocumentKey[]): Promise<MaybeDocument[]> {
const params: BatchGetDocumentsRequest = {
database: this.serializer.encodedDatabaseId,
documents: keys.map(k => this.serializer.toName(k))
};
return this.invokeStreamingRPC<
BatchGetDocumentsRequest,
api.BatchGetDocumentsResponse
>('BatchGetDocuments', params).then(response => {
let docs = maybeDocumentMap();
response.forEach(proto => {
const doc = this.serializer.fromMaybeDocument(proto);
docs = docs.insert(doc.key, doc);
});
const result: MaybeDocument[] = [];
keys.forEach(key => {
const doc = docs.get(key);
assert(!!doc, 'Missing entity in write response for ' + key);
result.push(doc!);
});
return result;
});
}
/** Gets an auth token and invokes the provided RPC. */
private invokeRPC<Req, Resp>(rpcName: string, request: Req): Promise<Resp> {
// TODO(mikelehen): Retry (with backoff) on token failures?
return this.credentials.getToken(/*forceRefresh=*/ false).then(token => {
return this.connection.invokeRPC<Req, Resp>(rpcName, request, token);
});
}
/** Gets an auth token and invokes the provided RPC with streamed results. */
private invokeStreamingRPC<Req, Resp>(
rpcName: string,
request: Req
): Promise<Resp[]> {
// TODO(mikelehen): Retry (with backoff) on token failures?
return this.credentials.getToken(/*forceRefresh=*/ false).then(token => {
return this.connection.invokeStreamingRPC<Req, Resp>(
rpcName,
request,
token
);
});
}
}
|